/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.service.impl;

import com.chinamobile.cmss.lakehouse.api.dto.ClusterStorageUsage;
import com.chinamobile.cmss.lakehouse.api.dto.LakehouseInstanceBean;
import com.chinamobile.cmss.lakehouse.api.service.LakehouseInstanceService;
import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.dto.LakehouseCreateRequest;
import com.chinamobile.cmss.lakehouse.common.dto.LakehouseReleaseRequest;
import com.chinamobile.cmss.lakehouse.common.enums.ClusterStatusTypeEnum;
import com.chinamobile.cmss.lakehouse.common.enums.ClusterTypeEnum;
import com.chinamobile.cmss.lakehouse.common.enums.ResourceSizeEnum;
import com.chinamobile.cmss.lakehouse.common.enums.Status;
import com.chinamobile.cmss.lakehouse.common.kubernetes.ResourceSizeConfig;
import com.chinamobile.cmss.lakehouse.common.utils.BeanUtils;
import com.chinamobile.cmss.lakehouse.common.utils.ClusterUtil;
import com.chinamobile.cmss.lakehouse.common.utils.PageInfo;
import com.chinamobile.cmss.lakehouse.common.utils.ParameterUtils;
import com.chinamobile.cmss.lakehouse.common.utils.Result;
import com.chinamobile.cmss.lakehouse.common.utils.UUIDUtil;
import com.chinamobile.cmss.lakehouse.dao.EventDao;
import com.chinamobile.cmss.lakehouse.dao.HiveMetastoreConfigDao;
import com.chinamobile.cmss.lakehouse.dao.LakehouseClusterInfoDao;
import com.chinamobile.cmss.lakehouse.dao.SparkSQLTaskInfoDao;
import com.chinamobile.cmss.lakehouse.dao.UserAccessRecordDao;
import com.chinamobile.cmss.lakehouse.dao.entity.EngineTaskInfoEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.LakehouseClusterInfoEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.UserEntity;
import com.chinamobile.cmss.lakehouse.service.resource.LakehouseResourceService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.persistence.criteria.Predicate;

import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service
@Slf4j
public class LakehouseInstanceServiceImpl extends BaseServiceImpl implements LakehouseInstanceService {

    @Autowired
    private LakehouseResourceService lakehouseResourceService;

    @Autowired
    LakehouseClusterInfoDao lakehouseClusterInfoDao;

    @Autowired
    UserAccessRecordDao userAccessRecordDao;

    @Autowired
    SparkSQLTaskInfoDao sparkSQLTaskInfoDao;

    @Value("${resource.task.parallel:1,2,3,3}")
    Integer[] taskParallels;

    @Autowired
    HiveMetastoreConfigDao metastoreConfigDao;

    @Autowired
    private ResourceSizeConfig resourceConfig;

    @Autowired
    private EventDao eventDao;

    private LakehouseClusterInfoEntity initLakehouseInstance(String userId, LakehouseInstanceBean lakehouseInstanceBean) {
        LakehouseClusterInfoEntity lakehouseClusterInfoEntity =
            BeanUtils.copyFrom(lakehouseInstanceBean, LakehouseClusterInfoEntity.class);

        lakehouseClusterInfoEntity.setClusterType(ClusterTypeEnum.LAKEHOUSE.toString());
        lakehouseClusterInfoEntity.setInstanceId(lakehouseInstanceBean.getInstanceId());
        lakehouseClusterInfoEntity.setInstance(lakehouseInstanceBean.getInstance());
        lakehouseClusterInfoEntity.setCreateUser(lakehouseInstanceBean.getUserName());
        lakehouseClusterInfoEntity.setCreateTime(new Date());

        ResourceSizeEnum resourcePoolSize = lakehouseInstanceBean.getComputeResourceSize();
        if (resourcePoolSize != null) {
            lakehouseClusterInfoEntity.setComputeResourceSize(resourcePoolSize);
            lakehouseClusterInfoEntity.setTaskParallelize(taskParallels[resourcePoolSize.ordinal()]);
        }

        lakehouseClusterInfoEntity.setStatus(ClusterStatusTypeEnum.ACCEPTED.getStatus());
        ClusterStorageUsage clusterStorageUsage = new ClusterStorageUsage();
        clusterStorageUsage.setHourStorageMap(new HashMap<>());
        lakehouseClusterInfoEntity.setStorageUsage(new Gson().toJson(clusterStorageUsage));
        lakehouseClusterInfoEntity.setUserId(userId);
        lakehouseClusterInfoEntity.setVersion(lakehouseInstanceBean.getVersion());
        lakehouseClusterInfoEntity.setEngineType(lakehouseInstanceBean.getEngineType());
        lakehouseClusterInfoDao.saveAndFlush(lakehouseClusterInfoEntity);
        return lakehouseClusterInfoEntity;
    }

    @Override
    public Map<String, Object> createLakehouseInstance(String userId, LakehouseInstanceBean lakehouseInstanceBean) {
        Map<String, Object> result = new HashMap<>();

        if (isInstanceNameExists(userId, lakehouseInstanceBean.getInstance())) {
            putMessage(result, Status.LAKEHOUSE_EXIST);
            return result;
        }

        LakehouseClusterInfoEntity instanceEntity = initLakehouseInstance(userId, lakehouseInstanceBean);

        if (ClusterTypeEnum.LAKEHOUSE.toString().equalsIgnoreCase(lakehouseInstanceBean.getClusterType())) {
            LakehouseCreateRequest createRequest = LakehouseCreateRequest.builder()
                .instanceId(instanceEntity.getInstanceId()).createUser(lakehouseInstanceBean.getUserName())
                .instance(lakehouseInstanceBean.getInstance())
                .resouceSize(lakehouseInstanceBean.getComputeResourceSize())
                .type(ClusterTypeEnum.LAKEHOUSE).engineType(lakehouseInstanceBean.getEngineType()).build();
            log.info("create cluster...{}", createRequest);
            // call async create and return successful status directly
            lakehouseResourceService.createLakehouseResourceAsync(createRequest);
            result.put(Constants.DATA_LIST, ClusterStatusTypeEnum.DEPLOYING);
            putMessage(result, Status.SUCCESS);
        } else {
            putMessage(result, Status.LAKEHOUSE_RESOURCE_CREATE_ERROR);
        }

        return result;
    }

    @Override
    public Map<String, Object> cancelLakehouseInstance(String userId, String instanceId) {
        Map<String, Object> result = new HashMap<>();
        LakehouseClusterInfoEntity lakehouseClusterInfoEntity = lakehouseClusterInfoDao.findByInstanceId(instanceId);
        if (lakehouseClusterInfoEntity == null) {
            log.error("Release instance failed, instance {} not exist", instanceId);
            putMessage(result, Status.LAKEHOUSE_INSTANCE_LIST_NULL_ERROR);
            return result;
        }

        if (ClusterTypeEnum.LAKEHOUSE.toString().equalsIgnoreCase(lakehouseClusterInfoEntity.getClusterType())) {
            LakehouseReleaseRequest lakehouseReleaseRequest = new LakehouseReleaseRequest();
            lakehouseReleaseRequest.setInstanceId(lakehouseClusterInfoEntity.getInstanceId());
            lakehouseReleaseRequest.setInstance(lakehouseClusterInfoEntity.getInstance());
            lakehouseReleaseRequest.setCreateUser(lakehouseClusterInfoEntity.getCreateUser());
            lakehouseReleaseRequest.setEngineType(lakehouseClusterInfoEntity.getEngineType());
            log.info("release cluster...{}", lakehouseReleaseRequest);
            lakehouseClusterInfoDao.updateStatus(instanceId, ClusterStatusTypeEnum.RELEASING.getStatus());
            // call async release
            lakehouseResourceService.releaseLakehouseResourceAsync(lakehouseReleaseRequest);
            // clean engine_task_info
            sparkSQLTaskInfoDao.deleteAllByInstance(lakehouseClusterInfoEntity.getInstance());
            // update status
            lakehouseClusterInfoEntity.setStatus(ClusterStatusTypeEnum.RELEASED.getStatus());
            lakehouseClusterInfoEntity.setUpdateTime(new Date());

            // generate instance name by uuid
            String instanceName = lakehouseClusterInfoEntity.getInstance();
            lakehouseClusterInfoEntity.setInstance(instanceName + "-" + UUIDUtil.generateUuid());
            lakehouseClusterInfoDao.save(lakehouseClusterInfoEntity);
            // return successful status directly
            result.put(Constants.DATA_LIST, ClusterStatusTypeEnum.RELEASING);
            putMessage(result, Status.SUCCESS);
        } else {
            putMessage(result, Status.LAKEHOUSE_RESOURCE_RELEASE_ERROR);
        }

        return result;
    }

    @Override
    public boolean isInstanceNameExists(String userId, String instance) {
        LakehouseClusterInfoEntity lakehouseClusterInfoEntity =
            lakehouseClusterInfoDao.findByUserIdAndInstance(userId, instance);
        return lakehouseClusterInfoEntity != null;
    }

    @Override
    public Map<String, Object> getInstanceList(String userId, String engineType) {
        Map<String, Object> result = new HashMap<>();
        List<LakehouseClusterInfoEntity> clusterInfoEntityList = new ArrayList<>();

        log.info("user {} get instanceList for {} type.", userId, engineType);
        List<LakehouseClusterInfoEntity> ownerClusterInfoEntityList =
            lakehouseClusterInfoDao.findAllByUserIdAndEngineType(userId, engineType);

        if (!CollectionUtils.isEmpty(ownerClusterInfoEntityList)) {
            for (LakehouseClusterInfoEntity lakehouseClusterInfoEntity : ownerClusterInfoEntityList) {
                LakehouseClusterInfoEntity lakehouseInstanceBean =
                    BeanUtils.copyFrom(lakehouseClusterInfoEntity, LakehouseClusterInfoEntity.class);
                String status = lakehouseInstanceBean.getStatus();
                if (status.equals(ClusterStatusTypeEnum.RELEASING.getStatus())
                    || status.equals(ClusterStatusTypeEnum.RELEASED.getStatus())
                    || status.equals(ClusterStatusTypeEnum.CREATE_FAILED.getStatus())) {
                    continue;
                }
                clusterInfoEntityList.add(lakehouseInstanceBean);
            }
        }
        result.put(Constants.DATA_LIST, clusterInfoEntityList);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> getInstanceNames(String userId, String engineType) {
        Map<String, Object> result = new HashMap<>();
        List<LakehouseClusterInfoEntity> instanceList = (List<LakehouseClusterInfoEntity>) getInstanceList(userId, engineType).get(Constants.DATA_LIST);
        List<String> instanceNameList = instanceList.stream().map(LakehouseClusterInfoEntity::getInstance).collect(Collectors.toList());
        result.put(Constants.DATA_LIST, instanceNameList);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> getInstanceListHealthStatus(List<LakehouseClusterInfoEntity> clusterInfoEntityList) {
        Map<String, Object> result = new HashMap<>();
        if (CollectionUtils.isEmpty(clusterInfoEntityList)) {
            return result;
        }
        List<String> namespaceList = new ArrayList<>();
        clusterInfoEntityList.forEach(infoEntity -> {
            String namespaceName = ClusterUtil.convertCluster2Namespace(infoEntity.getInstance());
            namespaceList.add(namespaceName);
        });
        for (String namespace : namespaceList) {
            boolean healthy = false;
            try {
                healthy = lakehouseResourceService.checkClusterHealthStatus(namespace);
            } catch (Exception e) {
                log.info("Failed to check cluster health status!");
            }
            result.put(namespace, healthy);
        }
        return result;
    }

    @Override
    public void updateInstanceListStatus(String status, List<LakehouseClusterInfoEntity> clusterInfoEntityList) {
        if (CollectionUtils.isEmpty(clusterInfoEntityList)) {
            return;
        }
        List<String> instanceIdList = clusterInfoEntityList.stream()
            .map(LakehouseClusterInfoEntity::getInstanceId)
            .collect(Collectors.toList());
        lakehouseClusterInfoDao.updateListStatus(status, instanceIdList);
    }

    @Override
    public Result getInstanceList(UserEntity loginUser, String searchVal, Integer pageNo, Integer pageSize) {
        Result<Object> result = new Result<>();
        PageRequest pageRequest = PageRequest.of(pageNo - 1, pageSize, Sort.Direction.DESC, "createTime");
        Specification<LakehouseClusterInfoEntity> specification = buildSpecification(searchVal, isAdmin(loginUser) ? null : loginUser.getUserId());
        Page<LakehouseClusterInfoEntity> page = lakehouseClusterInfoDao.findAll(specification, pageRequest);
        ArrayList<LakehouseInstanceBean> lakehouseInstanceBeans = new ArrayList<>();
        for (LakehouseClusterInfoEntity lakehouseClusterInfoEntity : page.getContent()) {
            LakehouseInstanceBean lakehouseInstanceBean =
                BeanUtils.copyFrom(lakehouseClusterInfoEntity, LakehouseInstanceBean.class);
            lakehouseInstanceBean.setTaskRunningTime(getInstanceRunningTime(lakehouseInstanceBean));
            lakehouseInstanceBean.setCreatedBy(lakehouseClusterInfoEntity.getCreateUser());
            lakehouseInstanceBeans.add(lakehouseInstanceBean);
        }
        PageInfo<LakehouseInstanceBean> pageInfo = new PageInfo<>(pageNo, pageSize);
        pageInfo.setTotalList(lakehouseInstanceBeans);
        pageInfo.setTotal((int) page.getTotalElements());
        result.setData(pageInfo);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> getInstance(UserEntity loginUser, String instanceId) {
        Map<String, Object> result = new HashMap<>();
        LakehouseClusterInfoEntity lakehouseClusterInfoEntity = lakehouseClusterInfoDao.findByInstanceId(instanceId);
        if (lakehouseClusterInfoEntity == null) {
            log.error("get instance failed, instance {} not exist", instanceId);
            putMessage(result, Status.LAKEHOUSE_INSTANCE_LIST_NULL_ERROR);
            return result;
        }
        if (!loginUser.getUserId().equals(lakehouseClusterInfoEntity.getUserId()) && !isAdmin(loginUser)) {
            log.error("get instance {} failed, user verification failed", instanceId);
            putMessage(result, Status.USER_NO_OPERATION_PERM);
            return result;
        }
        LakehouseInstanceBean lakehouseInstanceBean =
            BeanUtils.copyFrom(lakehouseClusterInfoEntity, LakehouseInstanceBean.class);
        lakehouseInstanceBean.setCreatedBy(lakehouseClusterInfoEntity.getCreateUser());
        try {
            Gson gson = new Gson();
            ClusterStorageUsage clusterStorageUsage =
                gson.fromJson(lakehouseClusterInfoEntity.getStorageUsage(), ClusterStorageUsage.class);
            if (clusterStorageUsage != null && clusterStorageUsage.getCurrentStorage() != null) {
                lakehouseInstanceBean.setStorageUsage(clusterStorageUsage.getCurrentStorage());
            } else {
                lakehouseInstanceBean.setStorageUsage(0L);
            }
        } catch (Exception e) {
            log.info("Storage usage: {}, cannot cast to ClusterStorageUsage", lakehouseClusterInfoEntity.getStorageUsage());
            lakehouseInstanceBean.setStorageUsage(0L);
        }
        result.put(Constants.DATA_LIST, lakehouseInstanceBean);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    private Specification<LakehouseClusterInfoEntity> buildSpecification(String searchVal, String userId) {
        return (root, criteriaQuery, cBuilder) -> {
            //define a predicate
            Predicate p = cBuilder.conjunction();
            p = cBuilder.and(p, cBuilder.notEqual(root.get("status"), "released"));
            if (StringUtils.isNotBlank(searchVal)) {
                p = cBuilder.and(p, cBuilder.like(root.get("instance"), "%" + ParameterUtils.replaceSpecialChars(searchVal) + "%"));
            }
            if (StringUtils.isNotBlank(userId)) {
                p = cBuilder.and(p, cBuilder.equal(root.get("userId"), userId));
            }
            return p;
        };
    }

    private long getInstanceRunningTime(LakehouseInstanceBean warpdriveInstanceBean) {
        List<EngineTaskInfoEntity> taskInfoEntityList =
            sparkSQLTaskInfoDao.findFinishedTasks(warpdriveInstanceBean.getInstance());
        long lastTime = lastTimeInTasks(taskInfoEntityList);
        return lastTime / 1000;
    }

    public long lastTimeInTasks(List<EngineTaskInfoEntity> taskInfoEntityList) {
        if (taskInfoEntityList == null || taskInfoEntityList.size() == 0) {
            return 0;
        }
        // 1. get the points, and number , start with 1 , end with -1
        List<Long> points = new ArrayList<>();
        Map<Long, Integer> values = new HashMap<>();
        long lastTime = 0;
        for (EngineTaskInfoEntity sparkSQLTaskInfoPO : taskInfoEntityList) {
            Date submitTime = sparkSQLTaskInfoPO.getSubmitTime();
            if (submitTime != null) {
                Long taskStartTime = submitTime.getTime();
                Integer value = values.get(taskStartTime);
                if (value == null) {
                    points.add(taskStartTime);
                    values.put(taskStartTime, 1);
                } else {
                    values.put(taskStartTime, value + 1);
                }
            }

            Date finishTime = sparkSQLTaskInfoPO.getFinishTime();
            if (finishTime != null) {
                Long taskEndTime = finishTime.getTime();
                Integer value = values.get(taskEndTime);
                if (value == null) {
                    points.add(taskEndTime);
                    values.put(taskEndTime, -1);
                } else {
                    values.put(taskEndTime, value - 1);
                }
            }
        }
        // 2. sort the list
        Collections.sort(points);
        // 3. get the value
        Long oldTime = points.get(0);
        int sumValue = 0;
        for (Long point : points) {
            Integer value = values.get(point);
            if (value != null) {
                if (sumValue > 0) {
                    lastTime += point - oldTime;
                }
                oldTime = point;
                sumValue += value;
            }
        }
        return lastTime;
    }

}
